﻿// See https://aka.ms/new-console-template for more information
using System.Buffers;
using System.IO.Pipelines;

namespace PipelinesDemo // Note: actual namespace depends on the project name.
{
    public class Program
    {
        public static  void Main(string[] args)
        {
            Console.WriteLine("1:内存Pipe测试");
            var type = Console.ReadKey().KeyChar;
            switch (type)
            {
                case '1':
                    MemoryPipeLinesAsync().Wait();
                    break;
            }
        }
        private static async Task MemoryPipeLinesAsync()
        {
            var pipe = new Pipe();
            Task writing = FillPipeAsync(pipe.Writer);
            Task reading = ReadPipeAsync(pipe.Reader);

            await Task.WhenAll(reading, writing);


            async Task FillPipeAsync(PipeWriter writer)
            {
                while (true)
                {

                    Memory<byte> memory = writer.GetMemory();
                    try
                    {
                        var msg = $"MemoryPipeLinesAsync:{DateTime.Now}\n";
                        var bytes = System.Text.Encoding.UTF8.GetBytes(msg);
                        bytes.CopyTo(memory);
                        writer.Advance(bytes.Length);
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                        break;
                    }

                    // Make the data available to the PipeReader.
                    FlushResult result = await writer.FlushAsync();

                    if (result.IsCompleted)
                    {
                        break;
                    }
                    await Task.Delay(1000);
                }
                await writer.CompleteAsync();
            }

            async Task ReadPipeAsync(PipeReader reader)
            {
                while (true)
                {
                    ReadResult result = await reader.ReadAsync();
                    ReadOnlySequence<byte> buffer = result.Buffer;

                    while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
                    {
                        var msg = System.Text.Encoding.UTF8.GetString(line.ToArray());
                        Console.WriteLine(msg);
                    }

                    // Tell the PipeReader how much of the buffer has been consumed.
                    reader.AdvanceTo(buffer.Start, buffer.End);

                    // Stop reading if there's no more data coming.
                    if (result.IsCompleted)
                    {
                        break;
                    }
                    await Task.Delay(1000);
                }

                // Mark the PipeReader as complete.
                await reader.CompleteAsync();
            }

            bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
            {
                // Look for a EOL in the buffer.
                SequencePosition? position = buffer.PositionOf((byte)'\n');

                if (position == null)
                {
                    line = default;
                    return false;
                }

                // Skip the line + the \n.
                line = buffer.Slice(0, position.Value);
                buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
                return true;
            }
        }


    }
}